// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.

#pragma once

#include "pure_mem/encoded_version_node.h"
#include "pure_mem/key_index/art/rowex_tree.h"
#include "pure_mem/key_index/art/rowex_tree_fullkey.h"
#include "pure_mem/key_index/tree_void_ref.h"
#include "pure_mem/mvcc_key.h"
#include "pure_mem/uncoded_version_node.h"
#include "rocksdb/slice.h"

namespace rocksdb {
/**
 * 定义　ITree 的多种不同算法实现
 */
enum UserKeyIndexType {
  ART_ROWEX = 1,         // ART + ROWEX
  ART_ROWEX_FULLKEY = 2, // ART　+ ROWEX + KEY完全存储在ART
  B_TREE = 3,            // Ｂ树
  ART_SINGLE_THREAD = 4  // ART　无并发机制版，基于 libart 源码
};

/**
 * 业务ｋｅｙ检索的通用接口
 *
 */
class IUserKeyIndex {
public:
  IUserKeyIndex(UserKeyIndexType type, Logger* log) : type_(type), logger_(log) {
    switch (type) {
    case ART_ROWEX:
      tree_ = new RowexTree(loadKeyFromTreeVoidRef);
      break;
    case ART_ROWEX_FULLKEY:
      tree_ = new RowexTreeFullKey(loadKeyFromTreeVoidRef, logger_);
      break;
    default:
      assert(0 == 1);
      break;
    }
  }
  /**
   * 析构函数必须要有，释放整个树的内存空间.
   */
  virtual ~IUserKeyIndex() {
    switch (type_) {
    case ART_ROWEX:
      delete dynamic_cast<RowexTree *>(tree_);
      break;

    case ART_ROWEX_FULLKEY:
      delete dynamic_cast<RowexTreeFullKey *>(tree_);
      break;

    default:
      assert(0 == 1);
      break;
    }
  }
  /**
   * 此函数用于向底层　ITree 对象传入的　LoadKeyFromValue　函数
   * 从叶子节点／TreeVoidRef对象中解析出对应的　ｋｅｙ值
   */
  static void loadKeyFromTreeVoidRef(void *ref, rocksdb::Slice &key) {
    if (ref == nullptr)
      return;
    ITreeVoidRef *curRef = (ITreeVoidRef *)ref;
    Slice ret = IUserKeyIndex::parseKeyFromTreeVoidRef(curRef);
    key.data_ = ret.data_;
    key.size_ = ret.size_;
  }
  /**
   * 从叶子节点／TreeVoidRef对象中解析出对应的　ｋｅｙ值
   *
   */
  static Slice parseKeyFromTreeVoidRef(ITreeVoidRef *ref) {
    if (ref == nullptr)
      return Slice();
    VersionNode* storeNode = ref->getContentList();
    while(storeNode == nullptr) {
      ref = ref->Next();
      if (ref == nullptr)
        return Slice();
      storeNode = ref->getContentList();
    }

    MvccKey mk;
    switch (storeNode->GetNodeType()) {
      case VersionNode::NodeType::ENCODED:
        mk.parseKey(GetLengthPrefixedSlice(dynamic_cast<EncodedVersionNode*>(storeNode)->Key()));
        break;
      case VersionNode::NodeType::UNCODED:
        mk = dynamic_cast<UncodedVersionNode*>(storeNode)->GetMvccKey();
        break;
    }
    return mk.userKey_;
  }

  // get userkey corresponeding TreeVoidRef object in ART tree,
  // if not exist new one object and put onto ART tree.
  virtual ITreeVoidRef *getTreeVoidRef(const Slice &userKey, const VersionNode *node) {
    return nullptr;
  }
  // get one TreeVoidRef object whose key equal or greater userkey,
  // while no object match, return nullptr.
  virtual ITreeVoidRef *getGETreeVoidRef(const Slice &userKey) { return nullptr; }
  virtual ITreeVoidRef *getGLTreeVoidRef(const Slice &userKey) { return nullptr; }

  virtual ITreeVoidRef *head() { return nullptr; }
  virtual ITreeVoidRef *tail() { return nullptr; }

protected:
  // ART tree object pointer, build when UserKeyIndex constructing.
  ITree *tree_;
  UserKeyIndexType type_;
  Logger* logger_;
};

} // namespace rocksdb
